-
Notifications
You must be signed in to change notification settings - Fork 272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Container GATs, improve traits #541
Conversation
15e1740
to
bb94b15
Compare
Signed-off-by: Moritz Hoffmann <[email protected]>
4801f8d
to
3cefd09
Compare
I reworked the PR to only contain a minimum of changes. This compiles with differential once applying the following patch: diff --git a/src/collection.rs b/src/collection.rs
index f2daf8ad..20cab0d2 100644
--- a/src/collection.rs
+++ b/src/collection.rs
@@ -416,10 +416,10 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
/// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
/// });
/// ```
- pub fn inspect_batch<F>(&self, func: F) -> Collection<G, D, R>
+ pub fn inspect_batch<F>(&self, mut func: F) -> Collection<G, D, R>
where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static {
self.inner
- .inspect_batch(func)
+ .inspect_batch(move |time, batch| func(time, &batch[..]))
.as_collection()
}
/// Attaches a timely dataflow probe to the output of a Collection.
( |
ac6e383
to
f117d82
Compare
Containers are generic over their contents, and over reading and draining. Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good! I propose we merge this and start to see what it leads to downstream.
/// of all pieces must be equal to the length of the original container. When combining | ||
/// containers, the length of the result must be the sum of the individual parts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this point we do not know what it means to "combine a container".
type DrainIter<'a>: Iterator<Item=Self::Item<'a>>; | ||
|
||
/// Returns an iterator that drains the contents of this container. | ||
/// Drain leaves the container in an undefined state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could/should say more about what "undefined state" means. Perhaps "it is an error to do anything other than clear or drop" or " unless you are certain the type is FOO".
F: FnMut(usize, &mut Self); | ||
} | ||
|
||
impl<T: Clone + 'static> PushPartitioned for Vec<T> { | ||
impl<T: PushContainer + 'static> PushPartitioned for T where for<'a> T::Item<'a>: PushInto<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussion: might we want to make push_partitioned
into a free function generic in a T1: Container
and T2: PushContainer
, reading from the first and writing in to the second?
@@ -7,7 +7,7 @@ use crate::dataflow::operators::generic::operator::Operator; | |||
use crate::dataflow::{Scope, StreamCore}; | |||
|
|||
/// Exchange records between workers. | |||
pub trait Exchange<D> { | |||
pub trait Exchange<C: PushPartitioned> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From discussion: This could be C: Container
because it is only used to bind C::Item<'a>
.
Cleanup the
Container::Item
type, move it toPushPartitioned
and make it a GAT. Similarly, split theMap
trait in two (Map
,MapInPlace
) and support GATs in theMap
trait.This has the benefit that Timely becomes less opinionated about the interface provided by containers, for example by allowing non-owned data when calling
stream.map
.